iT邦幫忙

2022 iThome 鐵人賽

DAY 8
0
Modern Web

身為 Node.js 開發者,可以知道一下的事系列 第 8

[Day 8] 利用 Task Queue 分散式處理非同步任務 - Bull

  • 分享至 

  • xImage
  •  

今天有一種情境,假設我們需要去匹次處理大量任務,每個任務都可能會有發生 error 的風險(資料錯誤或是暫時性第三方錯誤或是任何非預期原因)

例如: 假設我們今天有十萬個商品,都需要透過 API 去抓取他們的資料,然後根據情況進行某種商品資料更新,那一般會如何處理呢?


  • 一種比較笨的做法,是直接跑一個 for 或 while 迴圈,分別抓這些商品的資料並且更新。這種情況下,如果第三方服務臨時出問題呢? 我們可能會在迴圈中去用 try...catch 去捕獲這些 error,並且 retry 這些任務。然而在錯誤處理這塊,做法會相當的笨拙,並且也難以管理這些任務的進度。
  • 我們可能可以將這些 product 的更新任務放進資料庫中(Mongo, SQL, Redis...),然後依序去從這些資料中拉出任務出來執行,依次更新這些商品,中途若是發生錯誤,則進行 log 紀錄及 retry。

第二種方式看似可以解決這個問題,但今天任務一多時,如果只有一個 instance 在處理這些任務,則處理效率會十分緩慢。然而若是開多個 instance 來分擔這些任務時,則需要擔心 race condition 的問題,若是沒處理好,可能會導致任務被重複執行的錯誤狀況。
並且針對 error 的任務,事後若要重新執行,也相對麻煩。

Task Queue (Message Queue) 可以幫助我們解決上述的問題,並且是十分容易擴展服務的方式,而本篇介紹的 Bull 便是 node 生態系中一種 task queue 的解決方案。


Bull (官網文件)

什麼是 bull 呢?我們先看一下官網的介紹:

The fastest, most reliable, Redis-based queue for Node.
Carefully written for rock solid stability and atomicity.

簡單說,bull 是一種基於 redis 的 queue,可以幫助我們解決剛剛以上的問題,類似的工具有非常多,例如 RabitMQ 或 AWS 的 SQS 等。
透過使用 bull,我們可以在 task queue 中加入我們想要處理的任務。
以上述例子為例,則為 product ids。然後任務將分別被派發給處理這些任務的對象,簡稱 processor 或是 consumer。

值得開心的是,processor 可以是複數個,言下之意是,我們可以由多個 processors 來消耗及處理我們的任務,這些 processors 不只可以是位於不同主機的 instances,並且 bull 會協助我們派發這些任務,避免任務被重複執行。

另外,task queue 還能幫助我們設定每個任務之間的延遲、retry 次數、rate limite 等。暫停任務跟刪除任務也是可以做到的,使我們的任務變得更可掌控。


另外,在這邊推薦一個 bull 的 admin dashboard,目前在工作專案已使用幾週,可視化的介面非常親民方便,並且可以方便地重啟失敗的任務。
https://github.com/felixmosh/bull-board

另外,對 task queue 有興趣深入瞭解的,可以參考這篇莫力全大大的文章

結語

Task queue 是高併發架構常見的其中一個解決方案,可以幫助我們有條不亂的處理大量非同步任務,並且使系統更容易被管理跟追蹤,配合 node 搭配 pm2 的使用,更是如虎添翼,可以在近乎無痛的情況下開啟多個 consumers 來同時處理任務。

案例分享

以下是我使用 bulljs 配合上一篇的 node-cron 設計的一個管理 cron job 的 class,任務統一存在 redis 做管理,如果是以 datetime 設定,則會被視為一次性任務,執行後就會刪除。

import { CronJob } from 'cron';
import moment from 'moment-timezone';
import mainQueue from '../queue/queue';
import { redis } from './redis';

class CronManager {
  private jobs: CronJob[] = [];
  private redis_prefix = process.env.SITE_PREFIX;

  constructor() {
    this.refreshJobs();
  }

  async refreshJobs() {
    const allJobs = await this.getCronJobList();
    this.jobs.forEach(job => {
      job.stop();
    });
    this.jobs = allJobs
      .map(job => {
        const { id, action, payload, type, value } = job;
        const cron = type === 'DATETIME' ? moment(value).toDate() : value;
        try {
          return new CronJob(
            cron,
            () => {
              const data = {
                action,
                payload: payload,
              };
              if (type === 'DATETIME') {
                this.deleteCronJob(id);
              }
              mainQueue.add('main', data);
            },
            null,
            true,
            'Asia/Taipei',
          );
        } catch (error) {
          return null;
        }
      })
      .filter(item => item !== null);
  }
  async getCronJobList() {
    const all = await redis.get(`${this.redis_prefix}_conjob`);
    return all ? JSON.parse(all) : [];
  }

  async deleteCronJob(jobId: number) {
    const all = await this.getCronJobList();
    const newAll = all.filter(({ id }) => {
      return Number(id) !== Number(jobId);
    });
    await redis.set(`${this.redis_prefix}_conjob`, JSON.stringify(newAll));
    this.refreshJobs();
  }
}

export default new CronManager();

上一篇
[Day 7] 設定排程任務的工具 - node-cron
下一篇
[Day 9] 開發後端 API 的工具 - Web frameworks
系列文
身為 Node.js 開發者,可以知道一下的事9
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言